package drds.data_propagate.parse.multistage_coordinator;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import drds.data_propagate.binlog_event.BinLogEvent;
import drds.data_propagate.parse.DumperImpl;
import drds.data_propagate.parse.MessageEvent;
import drds.data_propagate.parse.exception.ParseException;

public class SinkStoreStageHandler implements EventHandler<MessageEvent>, LifecycleAware {

    private MultistageCoordinatorImpl multiStageCoprocessor;

    public SinkStoreStageHandler(MultistageCoordinatorImpl multiStageCoprocessor) {
        this.multiStageCoprocessor = multiStageCoprocessor;
    }

    public void onEvent(MessageEvent messageEvent, long index, boolean endOfBatch) throws Exception {
        try {
            if (messageEvent.getEntry() != null) {
                multiStageCoprocessor.eventList.add(messageEvent.getEntry());
            }

            BinLogEvent binLogEvent = messageEvent.getBinLogEvent();
            if (multiStageCoprocessor.dumper instanceof DumperImpl && binLogEvent.getSemiValue() == 1) {
                // semi ack回报
                ((DumperImpl) multiStageCoprocessor.dumper).sendSemiAckPacket(binLogEvent.getHeader().getLogFileName(), binLogEvent.getHeader().getNextBinLogEventPosition());
            }
            // clear for gc
            messageEvent.setBuffer(null);
            messageEvent.setBinLogEvent(null);
            messageEvent.setTableMetaData(null);
            messageEvent.setEntry(null);
            messageEvent.setNeedDmlParse(false);
        } catch (Throwable e) {
            multiStageCoprocessor.exception = new ParseException(e);
            throw multiStageCoprocessor.exception;
        }
    }

    @Override
    public void onStart() {

    }

    @Override
    public void onShutdown() {

    }
}
